Skip to main content

Processing Pipeline

Overview

The Voice Features processing pipeline transforms uploaded audio files through a series of automated stages, from initial upload to final result storage with complete metadata preservation. The pipeline supports multiple agent execution, comprehensive error handling, and real-time notifications.

Pipeline Architecture

Core Components

  • ConversationProcessingService: Main orchestrator for all processing stages
  • ConversationStore: Manages conversation data and metadata
  • AutomationStore: Handles automation rules and client-specific configurations
  • AutomationRunResults: Stores processing outcomes and agent results
  • TranscriptionServiceFactory: Provides transcription services with multiple providers
  • LangflowService: Executes LangFlow agents for content analysis
  • QueueServiceClient: Manages task queuing and concurrent processing
  • SocketNotificationService: Real-time status updates to clients

Pipeline Stages

Stage 1: Upload Confirmation

Status: "Uploading"

Operations

  • File Validation: Format, size, and security checks
  • Metadata Extraction: Duration, MIME type, and audio properties
  • Checksum Calculation: SHA-256 hash for integrity verification
  • Storage Allocation: File placement in appropriate storage backend
  • Database Record Creation: Initial conversation entry with ConversationSchema

Data Flow

// Input from confirm-upload endpoint
{
"filename": "meeting_recording.mp3",
"mediakey": "audio/2024/01/meeting_abc123.mp3",
"file_checksum": "a1b2c3d4e5f6...",
"duration": "00:30:47",
"size": 15728640,
"contentType": "audio/mpeg",
"folderId": 123
}

// Created ConversationSchema
{
"id": "conv_uuid",
"timestamp": 1705934400,
"duration": "00:30:47",
"transcript": "", // Initially empty
"custom_info": null,
"name": "meeting_recording.mp3",
"user_id": "user_123",
"runresultsreference": null, // Set after processing
"media_key": "audio/2024/01/meeting_abc123.mp3",
"file_checksum": "a1b2c3d4e5f6...",
"folder_id": 123,
"blob_source": "azure"
}

Stage 2: Task Queue Management

Status: "Pending" → "Processing"

Queue Operations

  • Task Creation: ConversationProcessingTask with all metadata
  • Concurrency Control: Maximum concurrent tasks enforcement (maxConcurrentTasks)
  • Priority Processing: FIFO queue with client isolation
  • Status Tracking: Real-time task status updates

Task Structure

interface ConversationProcessingTask {
id: string; // Processing ID (UUID)
status: TaskStatus; // PENDING, PROCESSING, COMPLETED, FAILED
type: "CONVERSATION_PROCESSING";
timestamp: number;
clientId: string;
payload: {
mediaKey: string;
folderId: number | null;
source: StorageType; // "azure", "s3", "local"
conversationId: string;
type: string; // "audio"
origin: string; // Upload source
metadata: AudioMetadata;
provider: string; // Transcription provider
serviceModel?: string;
}
}

Notification Events

// Queue status notifications
ConversationNotificationType.PENDING // Task queued
ConversationNotificationType.PREPARE // Task preparation
ConversationNotificationType.START // Processing started

Stage 3: Media Retrieval

Status: "Processing" - Download Phase

Smart Download Strategy

The service implements intelligent media retrieval:

// Option 1: Direct URL (preferred for supported services)
if (supportsUrl && storageSupportsDirectUrl) {
directUrl = await storageService.generateDownloadUrl({
containerName: this.containerName,
mediaKey,
expiresInMs: 15 * 60_000, // 15 minutes
});
}

// Option 2: Blob download (fallback)
if (!directUrl) {
blobData = await this.retrieveMediaFile(mediaKey, source, clientId, taskId);
}

Storage Service Integration

  • Azure Blob Storage: Primary storage with SAS token generation
  • S3 Compatible: Alternative cloud storage
  • Local Storage: Development and testing environments
  • Multi-backend Support: Automatic service selection based on blob_source

Error Handling

  • File Not Found: Clear error messages with retry suggestions
  • Network Issues: Exponential backoff retry logic
  • Permission Errors: Authentication failure notifications
  • Timeout Handling: Configurable download timeouts

Stage 4: Transcription Processing

Status: "Transcribing"

Provider-Agnostic Transcription

// Service selection based on provider
const transcriptionService = this.transcriptionServiceFactory.getService(provider);

// Dual transcription modes
if (directUrl && service.supportsUrlTranscription()) {
operationLocation = await service.transcribeFromUrl(
directUrl,
metadata.mimeType,
metadata.language
);
} else {
operationLocation = await service.transcribe(
blobData.content,
metadata.mimeType,
metadata.language
);
}

Transcription Workflow

  1. Service Selection: Choose appropriate transcription provider
  2. Format Detection: Automatic audio format analysis
  3. Language Detection: Auto-detect or use specified language
  4. Asynchronous Processing: Non-blocking transcription with polling
  5. Quality Assessment: Confidence scoring and validation

AudioMetadata Processing

interface AudioMetadata {
filename: string; // Original filename
originalname: string; // User-provided name
mimeType: string; // "audio/mpeg", "audio/wav", etc.
language: string; // "en-US", "es-ES", etc.
}

Notification Flow

ConversationNotificationType.TRANSCRIBE_START        // Transcription initiated
ConversationNotificationType.TRANSCRIBE_IN_PROGRESS // Processing ongoing
ConversationNotificationType.TRANSCRIBE // Transcription completed
ConversationNotificationType.TRANSCRIPTION_ERROR // Transcription failed

Timeout and Error Handling

try {
transcription = await service.getTranscriptionResult(operationLocation);
} catch (error) {
if (error.message.includes('Timeout waiting for transcription result')) {
// Handle timeout gracefully - continue with empty transcription
transcription = '';
// Notify client about timeout
} else {
throw error; // Re-throw other errors
}
}

Stage 5: Automation Rule Evaluation

Status: "Processing" - Automation Phase

Rule Matching Process

// Client-specific automation lookup
const automations = await this.automationService.getAutomationsByTypeAndOrigin(
type, // From processing config
origin // From processing config
);

// Additional client filtering happens in service layer
const clientAutomations = automations.filter(automation =>
automation.client_id === clientId && automation.is_active
);

Automation Configuration

interface Automation {
id: number;
name: string; // "Meeting Analysis Rule"
type: string; // "audio"
origin: string; // "meeting_uploads"
agents: string[]; // ["sentiment_agent", "summary_agent"]
last_update: Date;
is_active: boolean;
client_id: string; // Client isolation
}

Processing Scenarios

  1. No Automations Found: Continue with transcription only
  2. Single Automation: Execute all assigned agents
  3. Multiple Automations: Process all matching rules
  4. Empty Transcription: Skip automation processing

Stage 6: Agent Execution

Status: "Running Flows"

Multi-Agent Processing Architecture

async processAutomations(
automations: Automation[],
transcription: string,
conversationId: string,
clientId: string,
mediaKey: string,
taskId: string
): Promise<{
processedResults: ProcessedFlowResult[];
failedResults: ProcessedFlowResult[];
finalOutput: string;
}>

Agent Input Standardization

// Standard input format for all agents
const agentInput = {
transcription, // Complete transcribed text
conversationId, // For context and tracking
// Additional metadata available to agents
};

Parallel Agent Execution

for (const automation of automations) {
for (const flowId of automation.agents) {
// Each agent executes independently
const response = await this.langflowService.runAudioFlow(flowId, agentInput);

// Extract text from complex LangFlow response structure
let rawText = this.extractTextFromResponse(response);

if (rawText) {
processedResults.push({
flowId,
text: rawText,
timestamp: Date.now()
});
}
}
}

LangFlow Response Parsing

The service handles complex LangFlow response structures:

// Multiple possible response paths
if (response.data?.[0]?.results?.message?.text) {
rawText = response.data[0].results.message.text;
} else if (response.data?.[0]?.outputs?.message?.message?.text) {
rawText = response.data[0].outputs.message.message.text;
} else if (response.data?.outputs?.[0]?.outputs?.[0]?.results?.message?.data?.text) {
rawText = response.data.outputs[0].outputs[0].results.message.data.text;
}

Agent Result Collection

interface ProcessedFlowResult {
flowId: string; // Agent identifier
text?: string; // Successful output
timestamp: number; // Completion time
error?: string; // Error message if failed
}

interface TaskResults {
flows: ProcessedFlowResult[]; // Successful executions
failed_flows: ProcessedFlowResult[]; // Failed executions
finalOutput: string; // Combined result or error summary
}

Error Isolation and Recovery

  • Individual Agent Failures: Don't affect other agents
  • Partial Success Handling: Continue with successful results
  • Error Aggregation: Collect all errors for debugging
  • Timeout Management: Per-agent timeout limits

Stage 7: Result Storage and Completion

Status: "Completed"

AutomationRunResult Creation

const runResult: AutomationRunResult = {
id: uuidv4(), // Unique result identifier
file_reference: mediaKey, // Link to original media
task_results: {
flows: processedResults, // Successful agent outputs
failed_flows: failedResults, // Failed agent attempts
finalOutput: combinedOutput // Processed or error summary
},
created_at: new Date()
};

// Store results for future access
await this.automationRunResults.insert(runResult);

Conversation Update

// Update conversation with all processing results
const updateData = {
transcript: transcription, // Always update transcript
runresultsreference: runResult.id, // Link to processing results
custom_info: this.parseProcessingOutput(finalOutput) // Structured data
};

await this.conversationService.updateConversation(conversationId, updateData);

Custom Info Processing

// Parse agent output into structured data
if (finalOutput.includes('custom-info')) {
updateData.custom_info = parseJsonDynamic(finalOutput) || { "": "" };
} else {
updateData.custom_info = { "": "" }; // Default empty object
}

Notification System

Real-Time Status Updates

The pipeline provides comprehensive notifications throughout processing:

enum ConversationNotificationType {
PENDING = "pending",
PREPARE = "prepare",
START = "start",
DOWNLOAD = "download",
TRANSCRIBE_START = "transcribe_start",
TRANSCRIBE_IN_PROGRESS = "transcribe_in_progress",
TRANSCRIBE = "transcribe",
FETCH_AUTOMATIONS = "fetching-automations-up",
AUTOMATION_PROCESSING = "automation-processing",
RUNNING_FLOWS = "running - flows",
SAVE_DB = "save-db",
COMPLETED = "completed",

// Error notifications
FILE_UPLOAD_ERROR = "file-upload-error",
TRANSCRIPTION_ERROR = "transcription-error",
AUTOMATION_FETCH_ERROR = "automation-fetch-error",
AUTOMATION_ERROR = "automation-error"
}

Notification Data Structure

interface ConversationProcessingNotificationData {
type: ConversationNotificationType;
status: EventStatus; // SUCCESS, FAIL, WARNING
resourceId: string; // mediaKey
conversationId?: string;
timestamp: Date;
taskId?: string;
extra?: {
reason?: string;
processing_completed?: boolean;
is_processing?: boolean;
error?: string;
context?: string;
};
}

Error Handling and Recovery

Comprehensive Error Management

type ConversationProcessingErrorType =
| "fetch" // Database retrieval errors
| "transcribe" // Transcription service errors
| "automation" // Automation rule errors
| "service-restart" // System restart during processing
| "capacity" // Resource limitation errors
| "download" // File retrieval errors
| "upload" // File upload errors
| "save_db" // Database save errors
| "running_flows" // Agent execution errors
| "automation_processing" // Automation processing errors
| "aborted" // User-initiated cancellation
| "unhandled_error" // Unexpected errors
| "other"; // Miscellaneous errors

Error Recovery Strategies

Graceful Degradation

// Empty transcription handling
if (!transcription.trim()) {
await this.conversationService.updateConversation(conversationId, {
transcript: '',
custom_info: {
status: 'EMPTY_TRANSCRIPTION',
user_message: "We couldn't detect any speech in this recording."
}
});
// Complete task successfully with warning
}

Partial Failure Handling

// Automation error with successful transcription
try {
await this.processAndSaveAutomations(/* ... */);
} catch (automationError) {
await this.conversationService.updateConversation(conversationId, {
transcript: transcription, // Keep successful transcription
custom_info: {
error: 'Automation processing failed',
status: 'AUTOMATION_ERROR',
user_message: 'Transcript is available, but analysis failed.'
}
});
// Complete with warning status
}

Task Lifecycle Management

Boot Recovery

// Handle tasks interrupted by service restart
private async updateStatusOnBoot() {
const interruptedTasks = await this.conversationProcessingStore
.getProcessingByStatus(TaskStatus.PROCESSING);

// Mark as failed with service-restart error
const promises = interruptedTasks.map(task =>
this.conversationProcessingStore.updateConversationProcessingStatus(
task.processing_id, {
status: TaskStatus.FAILED,
error: {
message: 'Service restarted during processing',
type: 'service-restart'
}
}
)
);
}

Queue Cleanup

// Clean abandoned tasks on startup
private async cleanupQueue() {
const tasksCount = await this.taskManager.cleanupQueueByType("CONVERSATION_PROCESSING");
logger.debug(`Cleaned up ${tasksCount} tasks from queue`);
}

Performance and Scalability

Concurrency Control

// Configurable concurrent processing
private maxConcurrentTasks: number = 1;
private activeTasksCount: number = 0;

// Dynamic concurrency adjustment
setMaxConcurrentTasks(value: number) {
this.maxConcurrentTasks = Math.max(1, value);
}

Resource Management

  • Memory Optimization: Stream processing for large files
  • Storage Efficiency: Temporary URL generation for direct access
  • Network Optimization: Choose best download method per file
  • Database Optimization: Batch operations and proper indexing

Monitoring and Observability

// Performance timing
console.time(`conversation-processing-${mediaKey}`);
// ... processing ...
console.timeEnd(`conversation-processing-${mediaKey}`);

// Detailed logging at each stage
logger.debug(`Starting task: ${task.id} for media: ${mediaKey} (${activeTasksCount}/${maxConcurrentTasks})`);

Integration Points

Storage Backends

  • Multi-provider Support: Azure, S3, local storage
  • Automatic Selection: Based on blob_source configuration
  • Fallback Mechanisms: Graceful degradation between storage types

Transcription Services

  • Provider Abstraction: Pluggable transcription services
  • Capability Detection: URL vs blob transcription support
  • Quality Optimization: Best method selection per service

LangFlow Integration

  • Agent Orchestration: Multiple agent execution per automation
  • Response Parsing: Robust output extraction from complex responses
  • Error Isolation: Individual agent failure handling

The Voice Features processing pipeline provides enterprise-grade reliability with comprehensive error handling, real-time monitoring, and graceful degradation for all failure scenarios.